/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.runners.gaia.translation.functions;



import com.bff.gaia.unified.runners.core.*;

import com.bff.gaia.unified.runners.core.DoFnRunner;

import com.bff.gaia.unified.runners.core.DoFnRunners;

import com.bff.gaia.unified.runners.core.InMemoryStateInternals;

import com.bff.gaia.unified.runners.core.InMemoryTimerInternals;

import com.bff.gaia.unified.runners.core.StateInternals;

import com.bff.gaia.unified.runners.core.StateNamespace;

import com.bff.gaia.unified.runners.core.StateNamespaces;

import com.bff.gaia.unified.runners.core.TimerInternals;

import com.bff.gaia.unified.runners.core.construction.SerializablePipelineOptions;

import com.bff.gaia.unified.runners.gaia.GaiaPipelineOptions;

import com.bff.gaia.unified.runners.gaia.metrics.DoFnRunnerWithMetricsUpdate;

import com.bff.gaia.unified.runners.gaia.translation.utils.GaiaClassloading;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.transforms.DoFn;

import com.bff.gaia.unified.sdk.transforms.DoFnSchemaInformation;

import com.bff.gaia.unified.sdk.transforms.ParDo;

import com.bff.gaia.unified.sdk.transforms.reflect.DoFnInvoker;

import com.bff.gaia.unified.sdk.transforms.reflect.DoFnInvokers;

import com.bff.gaia.unified.sdk.transforms.windowing.BoundedWindow;

import com.bff.gaia.unified.sdk.util.WindowedValue;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.sdk.values.PCollectionView;

import com.bff.gaia.unified.sdk.values.TupleTag;

import com.bff.gaia.unified.sdk.values.WindowingStrategy;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Lists;

import com.bff.gaia.api.common.functions.RichGroupReduceFunction;

import com.bff.gaia.api.common.functions.RuntimeContext;

import com.bff.gaia.configuration.Configuration;

import com.bff.gaia.util.Collector;

import org.joda.time.Instant;



import java.util.Iterator;

import java.util.List;

import java.util.Map;

import java.util.Optional;



import static com.bff.gaia.util.Preconditions.checkArgument;



/** A {@link RichGroupReduceFunction} for stateful {@link ParDo} in Gaia Batch Runner. */

public class GaiaStatefulDoFnFunction<K, V, OutputT>

    extends RichGroupReduceFunction<WindowedValue<KV<K, V>>, WindowedValue<OutputT>> {



  private final DoFn<KV<K, V>, OutputT> dofn;

  private String stepName;

  private final WindowingStrategy<?, ?> windowingStrategy;

  private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;

  private final SerializablePipelineOptions serializedOptions;

  private final Map<TupleTag<?>, Integer> outputMap;

  private final TupleTag<OutputT> mainOutputTag;

  private final Coder<KV<K, V>> inputCoder;

  private final Map<TupleTag<?>, Coder<?>> outputCoderMap;

  private final DoFnSchemaInformation doFnSchemaInformation;

  private transient DoFnInvoker doFnInvoker;



  public GaiaStatefulDoFnFunction(

      DoFn<KV<K, V>, OutputT> dofn,

      String stepName,

      WindowingStrategy<?, ?> windowingStrategy,

      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,

      PipelineOptions pipelineOptions,

      Map<TupleTag<?>, Integer> outputMap,

      TupleTag<OutputT> mainOutputTag,

      Coder<KV<K, V>> inputCoder,

      Map<TupleTag<?>, Coder<?>> outputCoderMap,

      DoFnSchemaInformation doFnSchemaInformation) {



    this.dofn = dofn;

    this.stepName = stepName;

    this.windowingStrategy = windowingStrategy;

    this.sideInputs = sideInputs;

    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);

    this.outputMap = outputMap;

    this.mainOutputTag = mainOutputTag;

    this.inputCoder = inputCoder;

    this.outputCoderMap = outputCoderMap;

    this.doFnSchemaInformation = doFnSchemaInformation;

  }



  @Override

  public void reduce(

      Iterable<WindowedValue<KV<K, V>>> values, Collector<WindowedValue<OutputT>> out)

      throws Exception {

    RuntimeContext runtimeContext = getRuntimeContext();



    DoFnRunners.OutputManager outputManager;

    if (outputMap.size() == 1) {

      outputManager = new GaiaDoFnFunction.DoFnOutputManager(out);

    } else {

      // it has some additional Outputs

      outputManager = new GaiaDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap);

    }



    final Iterator<WindowedValue<KV<K, V>>> iterator = values.iterator();



    // get the first value, we need this for initializing the state internals with the key.

    // we are guaranteed to have a first value, otherwise reduce() would not have been called.

    WindowedValue<KV<K, V>> currentValue = iterator.next();

    final K key = currentValue.getValue().getKey();



    final InMemoryStateInternals<K> stateInternals = InMemoryStateInternals.forKey(key);



    // Used with Batch, we know that all the data is available for this key. We can't use the

    // timer manager from the context because it doesn't exist. So we create one and advance

    // time to the end after processing all elements.

    final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();

    timerInternals.advanceProcessingTime(Instant.now());

    timerInternals.advanceSynchronizedProcessingTime(Instant.now());



    List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet());



    DoFnRunner<KV<K, V>, OutputT> doFnRunner =

        DoFnRunners.simpleRunner(

            serializedOptions.get(),

            dofn,

            new GaiaSideInputReader(sideInputs, runtimeContext),

            outputManager,

            mainOutputTag,

            additionalOutputTags,

            new GaiaNoOpStepContext() {

              @Override

              public StateInternals stateInternals() {

                return stateInternals;

              }



              @Override

              public TimerInternals timerInternals() {

                return timerInternals;

              }

            },

            inputCoder,

            outputCoderMap,

            windowingStrategy,

            doFnSchemaInformation);



    if ((serializedOptions.get().as(GaiaPipelineOptions.class)).getEnableMetrics()) {

      doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());

    }



    doFnRunner.startBundle();



    doFnRunner.processElement(currentValue);

    while (iterator.hasNext()) {

      currentValue = iterator.next();

      doFnRunner.processElement(currentValue);

    }



    // Finish any pending windows by advancing the input watermark to infinity.

    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);



    // Finally, advance the processing time to infinity to fire any timers.

    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);

    timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);



    fireEligibleTimers(timerInternals, doFnRunner);



    doFnRunner.finishBundle();

  }



  private void fireEligibleTimers(

      InMemoryTimerInternals timerInternals, DoFnRunner<KV<K, V>, OutputT> runner)

      throws Exception {



    while (true) {



      TimerInternals.TimerData timer;

      boolean hasFired = false;



      while ((timer = timerInternals.removeNextEventTimer()) != null) {

        hasFired = true;

        fireTimer(timer, runner);

      }

      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {

        hasFired = true;

        fireTimer(timer, runner);

      }

      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {

        hasFired = true;

        fireTimer(timer, runner);

      }

      if (!hasFired) {

        break;

      }

    }

  }



  private void fireTimer(TimerInternals.TimerData timer, DoFnRunner<KV<K, V>, OutputT> doFnRunner) {

    StateNamespace namespace = timer.getNamespace();

    checkArgument(namespace instanceof StateNamespaces.WindowNamespace);

    BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();

    doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());

  }



  @Override

  public void open(Configuration parameters) throws Exception {

    doFnInvoker = DoFnInvokers.tryInvokeSetupFor(dofn);

  }



  @Override

  public void close() throws Exception {

    try {

      Optional.ofNullable(doFnInvoker).ifPresent(DoFnInvoker::invokeTeardown);

    } finally {

      GaiaClassloading.deleteStaticCaches();

    }

  }

}